/*
 *  Filename:lsv_log_gc_fullgc.c
 *  Description:
 *
 *  Created on: 2017年6月5日
 *  Author: Asdf(825674301)
 */

#include <time.h>

#include "volume_proto.h"
#include "core.h"

#include "lsv_conf.h"
#include "lsv_help.h"
#include "lsv_log.h"
#include "lsv_wbuffer.h"
#include "lsv_volume.h"
#include "lsv_gc_internal.h"

int lsv_gc_fullgc(lsv_volume_proto_t *lsv_info) {
        int rc = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_old_storage_t *old_storage;
        lsv_gc_old_storage_record_t *_record = NULL, *record = NULL;
        lsv_u32_t *fullgc_chunk_id_array = NULL, fullgc_chunk_count, chunk_count;
        lsv_u32_t i;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        old_storage = &gc_context->old_storage;

        //check
        //if ckeck_queue full, don't fullgc
        if (gc_context->check_queue.count > LSV_GC_FULLGC_EXIT_CKECK_QUEUE_SIZE_MUL * gc_context->check_queue_size) {
                DWARN("check_queue if full,don't fullgc.check_queue.count %u\n", gc_context->check_queue.count);
                goto out;
        }

        // 从storage或ready_chunk里取出若干数量的chunk_id，然后提交到check_queue里
        // 用gc_chkid记录当前处理位置

        // fullgc source
        lsv_lock(&old_storage->lock);
        if (0 == old_storage->chunk_count) {
                if (0 == old_storage->ready_chunk.count) {
                        lsv_unlock(&old_storage->lock);
                        DINFO("no need fullgc\n");
                        return ENODATA;
                }
                record = &old_storage->ready_chunk;
                DINFO("fullgc ready_chunk,count:%u\n", record->count);
        } else {
                _record = (lsv_gc_old_storage_record_t *) malloc(sizeof(lsv_gc_old_storage_record_t));
                if (NULL == _record) {
                        rc = -ENOMEM;
                        DERROR("malloc lsv_loggc_old_storage_record_t failed,errno:%d\n", rc);
                        goto RET;
                }
                rc = lsv_volume_chunk_read(lsv_info, old_storage->gc_chkid, (lsv_s8_t *)_record);
                if (rc < 0) {
                        DERROR("read volume err,err:%d,chunk_id:%u\n", rc, old_storage->gc_chkid);
                        goto RET;
                }
                record = _record;
                DINFO("fullgc storage_chunk,count:%u\n", record->count);
        }

        //fullgc chunk
        fullgc_chunk_id_array = (lsv_u32_t *) malloc(sizeof(lsv_u32_t) * LSV_GC_FULLGC_SIZE);
        fullgc_chunk_count = 0;
        for (i = 0; record->count > 0 && i < LSV_GC_FULLGC_SIZE; i++) {
                fullgc_chunk_id_array[fullgc_chunk_count++] = record->storage_chkid[--record->count];
                DINFO("fullgc a chunk, chunk_id:%u\n", fullgc_chunk_id_array[fullgc_chunk_count-1]);
        }

        //fullgc meta
        old_storage->all_gc_value = old_storage->all_gc_value >> 1;
        if (0 == old_storage->chunk_count) {
                // no storage chunk
        } else {
                if (record->count == 0) {
                        lsv_volume_chunk_free(lsv_info, LSV_GC_STORAGE_TYPE, old_storage->gc_chkid);
                        old_storage->gc_chkid = record->next_chkid;
                        old_storage->chunk_count--;
                        DINFO("free old_storage->chunk_count:%u\n", old_storage->chunk_count);
                } else {
                        // TODO 更新record->count
                        DINFO("update storage_chunk,count:%u\n", record->count);
                        lsv_volume_io_t vio;
                        lsv_volume_io_init(&vio, old_storage->gc_chkid, 0, sizeof(lsv_u32_t), LSV_BITMAP_STORAGE_TYPE);
                        rc = lsv_volume_chunk_update(lsv_info, &vio, (lsv_s8_t *)record);
                        if (rc < 0) {
                                // TODO while
                                YASSERT(0);
                        }
                }
        }

        // TODO
        lsv_gc_old_storage_head_set(lsv_info);

RET:
        if (_record) {
                free(_record);
        }
        lsv_unlock(&old_storage->lock);

        //add log to gc
        DINFO("add to check_queue. chunk_count:%u\n", fullgc_chunk_count);
        for (i = 0; i < fullgc_chunk_count; i+=LSV_GC_CHECK_QUEUE_FULLGC_ADD_TASK_MAX) {
                chunk_count = fullgc_chunk_count - i;
                chunk_count = _min(chunk_count, LSV_GC_CHECK_QUEUE_FULLGC_ADD_TASK_MAX);
                lsv_gc_add_to_check_queue_batch(lsv_info,
                                                    chunk_count,
                                                    fullgc_chunk_id_array+i,
                                                    LSV_CHECK_QUEUE_ADD_TYPE_TAIL);
        }

        DINFO("lsv_log_fullgc create a gc.\n");
out:
        rc = lsv_gc(lsv_info, LSV_GC_TYPE_LOOP);
        return rc;
}

static int __do_fullgc2(va_list ap) {
        lsv_volume_proto_t *lsv_info = va_arg(ap, lsv_volume_proto_t *);
        va_end(ap);

        int ret;

        ret = lsv_gc_fullgc_offer(lsv_info);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __do_fullgc(void *arg) {
        int ret;
        lsv_volume_proto_t *lsv_info = arg;
        chkid_t *volid = &lsv_info->volume_proto->chkid;

        DINFO("from timer, fullgc vol %s\n", id2str(volid));

        // NOT core thread, cannot run coroutine code
        ret = core_request(core_hash(volid), -1, "lsv_log_fullgc", __do_fullgc2, lsv_info);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int lsv_gc_fullgc_start_timer(lsv_volume_proto_t *lsv_info) {
        int ret;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        if (lsv_feature & LSV_FEATURE_GC) {
                ret = easy_timer_init(&gc_context->fullgc_timer,
                                      "fullgc_timer",
                                      __do_fullgc,
                                      lsv_info,
                                      LSV_GC_FULLGC_TIMER_INTERVAL * USEC_PER_SEC,
                                      -1,
                                      0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int lsv_gc_fullgc_check(lsv_volume_proto_t *lsv_info) {
        volume_proto_t *lich_volume_proto = NULL;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_old_storage_t *old_storage = NULL;
        lsv_wbuf_t *wbuf = NULL;
        double vol_chunk_num;
        lsv_u32_t log_chunk_num, vol_check_num, wbuffer_chunk_num, rcache_chunk_num;

        lich_volume_proto = lsv_info->volume_proto;
        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        old_storage = &gc_context->old_storage;

        // 实际值
        log_chunk_num = old_storage->ready_chunk.count +
                        old_storage->chunk_count * LSV_GC_OS_CHUNK_NUM +
                        gc_context->logctrl_tab->num_of_entries;
        // 理论值
        vol_chunk_num = lich_volume_proto->table1.fileinfo.logical_size / LSV_CHUNK_SIZE;

        wbuf = lsv_info->wbuf;
        wbuffer_chunk_num = (wbuf->wal->free_list.count + wbuf->wal->wait_list.count) * WAL_SEGMENT_CHUNK_NUM;
        rcache_chunk_num = 4096;
        vol_check_num = vol_chunk_num / (1 - LSV_GC_DIRTY_RATIO) +
                        1 +
                        64 +
                        1 +
                        vol_chunk_num/LSV_METADATA_RATIO +
                        wbuffer_chunk_num +
                        rcache_chunk_num +
                        LSV_GC_FULLGC_SIZE;
        //vol_check_size = data + meta_data  + snapshot + volume + write_buff + fullgc_size  (LSV_CHUNK_SIZE,MB)

        DINFO("fullgc stat info <%u,%.2lf> check %u\n", log_chunk_num, vol_chunk_num, vol_check_num);

        if (log_chunk_num > vol_check_num) {
                DINFO("offer a fullgc\n");
                lsv_gc_fullgc_offer(lsv_info);
        }

        return 0;
}
